The pipeline takes a long time to run for all committee sessions

You should limit to running on a subset of sessions with cache by adding a filter step to kns_documentcommitteesession

  additional-steps:
  - run: filter
    cache: true
    parameters:
      in:
      - CommitteeSessionID: 2063122
      - CommitteeSessionID: 2063126

Start a tika server for parsing doc / docx files:

docker run --rm -p 9998:9998 logicalspark/docker-tikaserver

Check that the Tika server is accessible via the notebook environment:


In [ ]:
%%bash
curl 172.17.0.1:9998 | tail

Run the pipelines, setting the tika server environment var with the correct value:


In [3]:
!{'cd /pipelines; TIKA_SERVER_ENDPOINT=http://172.17.0.1:9998 dpp run --verbose ./committees/kns_documentcommitteesession'}


[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f RUNNING ./committees/kns_documentcommitteesession
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f Collecting dependencies
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f Running async task
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f Waiting for completion
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f Async task starting
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f Searching for existing caches
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :Found cache for step 2: filter
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f Building process chain:
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :- cache_loader
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :- download_document_committee_session
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :- parse_meeting_protocols
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :- parse_meeting_protocols
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :- knesset.dump_to_path
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :- knesset.dump_to_sql
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :- (sink)
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :download_document_committee_session: INFO    :Processed 2 rows
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f DONE /usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/cache_loader.py
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f DONE /pipelines/committees/download_document_committee_session.py
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :parse_meeting_protocols: INFO    :Processed 2 rows
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :parse_meeting_protocols: INFO    :Processed 2 rows
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f DONE /pipelines/committees/parse_meeting_protocols.py
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :knesset.dump_to_path: INFO    :Processed 2 rows
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :knesset.dump_to_sql: INFO    :Processed 2 rows
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f DONE /pipelines/committees/parse_meeting_protocols.py
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f DONE /pipelines/datapackage_pipelines_knesset/processors/dump_to_path.py
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f DONE /usr/local/lib/python3.6/site-packages/datapackage_pipelines/manager/../lib/internal/sink.py
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f DONE /pipelines/datapackage_pipelines_knesset/processors/dump_to_sql.py
[./committees/kns_documentcommitteesession:T_0] >>> INFO    :21d7377f DONE V ./committees/kns_documentcommitteesession {'.dpp': {'out-datapackage-url': '../data/committees/kns_documentcommitteesession/datapackage.json'}, 'bytes': None, 'count_of_rows': 2, 'dataset_name': '_', 'download: downloaded files': 0, 'download: errored files': 0, 'download: existing files': 2, 'download: skipped files': 0, 'hash': '141945165787fffea9cf09d54c201a18', 'parts: errored files': 0, 'parts: existing files': 2, 'parts: missing download files': 0, 'parts: parsed files': 0, 'parts: skipped files': 0, 'text: errored files': 0, 'text: existing files': 2, 'text: missing download files': 0, 'text: parsed files': 0, 'text: skipped files': 0}
INFO    :RESULTS:
INFO    :SUCCESS: ./committees/kns_documentcommitteesession {'bytes': None, 'count_of_rows': 2, 'dataset_name': '_', 'download: downloaded files': 0, 'download: errored files': 0, 'download: existing files': 2, 'download: skipped files': 0, 'hash': '141945165787fffea9cf09d54c201a18', 'parts: errored files': 0, 'parts: existing files': 2, 'parts: missing download files': 0, 'parts: parsed files': 0, 'parts: skipped files': 0, 'text: errored files': 0, 'text: existing files': 2, 'text: missing download files': 0, 'text: parsed files': 0, 'text: skipped files': 0}

Inspect the output


In [9]:
from dataflows import Flow, load, printer
documentcommitteesessions = Flow(load('/pipelines/data/committees/kns_documentcommitteesession/datapackage.json')).results()[0][0]

In [19]:
session_id = 2063122
session = [session for session in documentcommitteesessions if session['CommitteeSessionID'] == session_id][0]
session_files = {
    'download': '/pipelines/data/committees/download_document_committee_session/'+session['download_filename'],
    'text': '/pipelines/data/committees/meeting_protocols_text/'+session['text_parsed_filename'],
    'text_hash': '/pipelines/data/committees/meeting_protocols_text/'+session['text_parsed_filename']+'.hash',
    'parts': '/pipelines/data/committees/meeting_protocols_parts/'+session['parts_parsed_filename'],
    'parts_hash': '/pipelines/data/committees/meeting_protocols_parts/'+session['parts_parsed_filename']+'.hash',
}
session_files


Out[19]:
{'download': '/pipelines/data/committees/download_document_committee_session/files/23/4/3/434231.DOC',
 'text': '/pipelines/data/committees/meeting_protocols_text/files/2/0/2063122.txt',
 'text_hash': '/pipelines/data/committees/meeting_protocols_text/files/2/0/2063122.txt.hash',
 'parts': '/pipelines/data/committees/meeting_protocols_parts/files/2/0/2063122.csv',
 'parts_hash': '/pipelines/data/committees/meeting_protocols_parts/files/2/0/2063122.csv.hash'}

In [25]:
import subprocess

for k, v in session_files.items():
    print(subprocess.check_output('ls -lah '+v, shell=True).decode().strip())


-rw-r--r--    1 root     root       46.0K Oct 15 06:17 /pipelines/data/committees/download_document_committee_session/files/23/4/3/434231.DOC
-rw-r--r--    1 root     root       83.1K Oct 15 18:02 /pipelines/data/committees/meeting_protocols_text/files/2/0/2063122.txt
-rw-r--r--    1 root     root          32 Oct 15 18:02 /pipelines/data/committees/meeting_protocols_text/files/2/0/2063122.txt.hash
-rw-r--r--    1 root     root       83.1K Oct 15 18:02 /pipelines/data/committees/meeting_protocols_parts/files/2/0/2063122.csv
-rw-r--r--    1 root     root          32 Oct 15 18:02 /pipelines/data/committees/meeting_protocols_parts/files/2/0/2063122.csv.hash